iT邦幫忙

2024 iThome 鐵人賽

DAY 5
0
Software Development

用 NestJS 闖蕩微服務!系列 第 5

[用NestJS闖蕩微服務!] DAY05 - MQTT Transporter

  • 分享至 

  • xImage
  •  

什麼是 MQTT ?

MQTT Logo

圖片來源

MQTT 是一個基於 TCP/IP、極輕量的傳輸協定,具有可靠性高、擴展性高、占用頻寬小等特性,主要用於 物聯網(Internet of Things, IoT) 設備的訊息傳遞。

MQTT 採用 Publish/Subscribe 機制來交換訊息,會有 Subscriber、Publisher 與 Message Broker 這三個角色,只要 Subscriber 訂閱 Publisher 指定的 主題(Topic),當 Publisher 發送了一則該 Topic 的訊息時,Message Broker 會將訊息轉發給 Subscriber。

MQTT Publish/Subscribe Concept

Topic

MQTT Topic 是 Publisher 與 Subscriber 溝通的介面,格式為 UTF-8 編碼的字串,需特別注意,總長度不得超過 65535 位元。它有一項特色是 階層(Level) 的概念,使用 斜線(/) 來表示資訊層級。下方是幾個 MQTT Topic 範例:

  • home/bedroom1/light
  • home/bedroom1/tv
  • home/bedroom2/audio
  • home/bedroom2/light

MQTT Topic 的 Level 設計是有意義的,試想如果 Subscriber 要訂閱某個資源下的所有資源,不太可能逐一訂閱,以上方 MQTT Topic 為例,如果要訂閱 home/bedroom1 下所有設備的資訊,就要訂閱 home/bedroom1/lighthome/bedroom1/tv,那如果home/bedroom1 下有上百種設備,用這種逐一訂閱的方式就顯得不切實際,所以 MQTT Topic 針對 Subscriber 設計了一些 萬用字元(Wildcards),讓 Subscriber 可以透過它們來減少逐一訂閱的成本。下方是使用 Whidcards 的 Topic:

  • home/+/light
  • home/bedroom1/#

會發現有 +# 兩種特殊符號,它們的用途如下:

  • +:該字元的階層可以是任何字元,以 home/+/light 來說,home/bedroom1/lighthome/bedroom2/light 都符合篩選條件。
  • #:該字元的階層與下方所有階層可以是任何字元,以 home/bedroom1/# 來說,home/bedroom1/home/bedroom1/tv 都符合篩選條件。

QoS

MQTT 定義了三種 服務品質(Quality of Service) 來根據不同環境、資訊重要程度等因素調整傳輸品質,分別是 QoS0QoS1 以及 QoS2

QoS0

QoS0 的品質下,訊息 最多傳輸一次,並不保證訊息會送達,這樣的好處是不用花費資源去檢查是否有正確送達,所以是三種品質中 最快的;但壞處就是遇到網路不穩、斷線等情況也不會因此重新發送訊息,造成 訊息遺失

QoS0 Concept

QoS1

QoS1 的品質下,訊息 至少傳輸一次,保證訊息會送達,那是怎麼做到保證送達的呢?以 Publisher 與 Message Broker 來說,在 Publisher 送出訊息後,會等待 Message Broker 回覆以確認該訊息有正確送達,假如 Publisher 遲遲收不到回覆的話,將會再傳送一次,但這也就衍伸了 重複發送 的問題,因為無法保證 Message Broker 回覆的時候網路是穩定的。

QoS1 Concept

QoS2

QoS2 的品質下,訊息 確實傳輸一次,保證訊息會送達且不會重複發送,是三種品質中 最安全的,那是怎麼做到的呢?同樣以 Publisher 與 Message Broker 來說,在 Publisher 送出訊息後,會等待 Message Broker 回覆以確認訊息有正確送達,當 Publisher 收到回覆時,會再將 釋放訊息(Publish Release, PUBREL) 發送給 Message Broker,當 Message Broker 收到後,會將訊息發送給 Subscriber,並發送 發佈完成訊息(Publish Complete, PUBCOMP) 給 Publisher,此時 Publisher 會將暫存的訊息清除,完成整個傳輸過程。可以看到這整個過程需要較多的傳輸步驟,所以它是三種品質中 最慢的

QoS2 Concept

安裝 MQTT Message Broker

市面上有許多 MQTT Message Broker,像是:MosquittoEMQXNanoMQ等,每一種都有其特點,在生產環境下可以選擇較適合的解決方案。這邊我們選擇使用由 Eclipse 基金會維護的 Mosquitto,它的特色是非常輕量、可以跨平台使用且很容易上手,非常適合作為入門的 MQTT Message Broker。

補充:此篇教學並不會很詳細解說 Mosquitto 的細節,主要只是使用 Mosquitto 當作 MQTT Message Broker,讓我們可以基於它來完成後續的教學,並從中實際體驗 MQTT 的運作方式,如果對 Mosquitto 有興趣的話,可以參考 官方文件,或是線上其他資源。

Mosquitto Logo

圖片來源

在終端機輸入下方指令以便從 Docker Hub 下載 Mosquitto 的 Docker Image:

$ docker pull eclipse-mosquitto

為了方便我們體驗 Mosquitto 的基礎用法,就不針對驗證的部分做處理,但 Mosquitto 在第二版之後預設不允許匿名連線,所以需要將相關參數打開,透過 Docker 相關指令可以讓我們抽換內部檔案,進而調整設定內容,先在本機產生名稱為 mosquitto.conf 的設定檔:

$ mkdir <FOLDER>
$ touch <FOLDER>/mosquitto.conf

接著,調整 mosquitto.conf 的內容,將 allow_anonymous 設為 true,並設定 listener1883,表示要將服務掛在 1883 port,最後再將 protocol 設定為 mqtt

allow_anonymous true

listener 1883
protocol mqtt

透過下方指令將 Mosquitto 架設在 1883 port,並指定要抽換的檔案路徑:

$ docker run --name <NAME> -p 1883:1883 -v /<FOLDER>:/mosquitto/config -d eclipse-mosquitto

接著,我們可以進入該 Container 下指令,進而使用 Mosquitto 的相關指令來操作 Mosquitto。透過下方指令執行 Container 內的 shell:

$ docker exec -it <NAME> sh

如此一來,便可以透過 Mosquitto Command 來操作 Mosquitto。透過下方指令可以顯示幫助清單:

$ mosquitto -h

操作 Mosquitto

在讓兩個服務透過該機制進行交換訊息之前,可以先試著用 Mosquitto Command 來操作,首先,打開兩個終端機並進入 Container 中的 shell,在其中一個終端機輸入下方指令來訂閱名稱為 home/bedroom1/light 的 Topic:

$ mosquitto_sub -v -t home/bedroom1/light

補充-v 的用途是顯示所有詳細資訊,-t 即 Topic。

Mosquitto Basic Subscription

接著,透過另一個終端機輸入下方指令發送 Topic 為 home/bedroom1/light 的訊息:

$ mosquitto_pub -t home/bedroom1/light -m "Hello World"

Mosquitto Basic Publisher

此時 Subscriber 會收到訊息:

Mosquitto Basic Subscription Result

接著,來測試一下使用 Wildcards 進行訂閱,透過下方指令訂閱符合 home/+/light 的 Topic:

$ mosquitto_sub -v -t home/+/light

此時發送 Topic 為 home/bedroom2/light 的訊息:

$ mosquitto_pub -t home/bedroom2/light -m "Hello World"

Subscriber 會順利收到發送的訊息:

Mosquitto Filter Subscription Result

再來測試一下另一種 Wildcards,透過下方指令訂閱符合 home/bedroom1/# 的 Topic:

$ mosquitto_sub -v -t home/bedroom1/#

此時發送 Topic 為 home/bedroom1/light 的訊息:

$ mosquitto_pub -t home/bedroom1/light -m "Hello World"

Subscriber 會順利收到發送的訊息:

Mosquitto Filter Subscription Result2

MQTT Transporter

NestJS 實作了 MQTT Transporter,讓微服務應用程式可以用跟其他 Transporter 一樣的開發風格來使用 MQTT,如果有物聯網需求或是在網路較嚴苛的情況下,使用 MQTT 會是不錯的選擇。

前置作業

要使用 MQTT Transporter 之前,需要先安裝下方套件:

$ npm install mqtt

補充:mqtt 是一套用於 Node.js 或瀏覽器的 MQTT 客戶端,有興趣可以參考 官方文件

建立微服務應用程式

修改載入點 main.ts 的內容,將 transport 設定為 Transport.MQTT,並根據架設的 MQTT Message Broker 位址來設定 url 的資訊,url 會以 mqtt:// 開頭:

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.MQTT,
      options: {
        url: 'mqtt://localhost:1883',
      }
    },
  );
  await app.listen();
}
bootstrap();

上方範例在 options 只使用了 url,事實上,MQTT Transporter 的 optionsmqttoptions,有興趣的話可以參考 mqtt 官方文件

Transporter 訊息模式

MQTT Transporter 也支援 Request-response 與 Event-based 訊息模式。下方是範例程式碼,在 AppController 內實作 sayHello 方法並套用 @MessagePattern 裝飾器,以及實作 listenDeviceEvent 方法並套用 @EventPattern 裝飾器:

import { Controller } from '@nestjs/common';
import { EventPattern, MessagePattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  @MessagePattern({ cmd: 'hello' })
  sayHello(data: string) {
    console.log(data);
    return `Hello, ${data}`;
  }

  @EventPattern('home/+/devices/#')
  listenDeviceEvent(device: { name: string; }) {
    console.log(device);
  }
}

使用 Mosquitto Command 來進行測試,透過下方指令發送 Topic 為 home/bedroom1/devices/light 的訊息:

$ mosquitto_pub -t home/bedroom1/devices/light -m '{ "name": "led" }'

此時在微服務應用程式的終端機會顯示發送的訊息,因為 Topic 有符合訂閱的篩選條件:

NestJS MQTT Transporter Test Result1

Pattern 設定為可序列化物件的話,要如何透過 Mosquitto Command 來發送訊息呢?這部分的處理方式跟 Redis CLI 是相同的,將該可序列化物件轉成 JSON 字串即可,以上方程式碼來說,我們要將 { cmd: 'hello' } 轉成字串 '{"cmd":"hello"}' 當作 Topic 名稱:

$ mosquitto_pub -t '{"cmd":"hello"}' -m "Hello World"

此時在微服務應用程式的終端機會顯示發送的訊息:

NestJS MQTT Transporter Test Result2

取得 Payload 與 Context

假如要取得該請求的相關資訊,比如:Topic 名稱,可以透過 @Ctx 裝飾器取得 MqttContext。下方是範例程式碼:

import { Controller } from '@nestjs/common';
import {
  Ctx,
  EventPattern,
  MessagePattern,
  Payload,
  MqttContext
} from '@nestjs/microservices';

@Controller()
export class AppController {
  @EventPattern('home/+/devices/#')
  listenDeviceEvent(
    @Payload() device: { name: string },
    @Ctx() ctx: MqttContext
  ) {
    console.log(ctx.getTopic());
    console.log(device);
  }
}

使用 Mosquitto Command 進行測試,透過下方指令發送 Topic 為 home/bedroom1/devices/light 的訊息:

$ mosquitto_pub -t home/bedroom1/devices/light -m '{ "name": "led" }'

此時在微服務應用程式的終端機會顯示發送的訊息以及 Topic 名稱:

NestJS MQTT Transporter Test Context Result

建立客戶端

修改 AppModule 的內容,透過 ClientsModule 建立 MQTT Transporter 的 ClientProxy

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
// ...

@Module({
  // ...
  imports: [
    ClientsModule.register([
      {
        name: 'MQTT_SERVICE',
        transport: Transport.MQTT,
        options: {
          url: 'mqtt://localhost:1883',
        }
      }
    ])
  ]
})
export class AppModule {}

傳送訊息

MQTT Transporter 也支援 Request-response 與 Event-based 訊息模式,所以可以使用 ClientProxysendemit 方法。下方是範例程式碼,修改 AppController 的內容,使用 @Inject 裝飾器注入 ClientProxy,並設計 getHelloemitDeviceEvent 方法:

import { Inject, Controller, Get } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(
    @Inject('MQTT_SERVICE') private readonly mqttService: ClientProxy
  ) {}

  @Get()
  getHello() {
    return this.mqttService.send({ cmd: 'hello' }, 'HAO');
  }

  @Get('emitDeviceEvent')
  emitDeviceEvent() {
    return this.mqttService.emit('home/bedroom1/devices/light', { name: 'led' });
  }
}

透過 Postman 使用 GET 方法存取 http://localhost:3000,會看到下方的結果:

NestJS MQTT Client Proxy Result1

透過 Postman 使用 GET 方法存取 http://localhost:3000/emitDeviceEvent,在微服務應用程式的終端機會看到 { name: 'led' }

NestJS MQTT Client Proxy Result2

MQTT Record Builder

在某些情境下,可能會需要根據傳遞的訊息內容來調整相關配置,如:QoS、額外資訊等,針對這種情況,NestJS 設計了 Record Builder 這個角色,讓我們可以針對不同種 Transporter 來建立適合該 Transporter 的配置。以下方程式碼為例,透過 MqttRecordBuilder 設置 QoS 為 2,並將建立好的 Record 當作資料讓 ClientProxy 來處理:

import { Inject, Controller, Get } from '@nestjs/common';
import { ClientProxy, MqttRecordBuilder } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(
    @Inject('MQTT_SERVICE') private readonly mqttService: ClientProxy
  ) {}

  @Get()
  getHello() {
    const record = new MqttRecordBuilder('HAO')
      .setQoS(2)
      .build();
    return this.mqttService.send({ cmd: 'hello' }, record);
  }
}

Request-response 的實現原理

由於 MQTT 採用 Publish/Subscribe 機制,故需要透過兩個 Topic 來完成 Request-response 的運作,NestJS 會讓微服務應用程式訂閱指定 Topic 名稱的訊息,比如:devices/light,在回覆訊息時,會傳送 Topic 為 devices/light/reply 的訊息,NestJS 客戶端會訂閱該 Topic 以獲取回應。

Request-response in MQTT Concept

補充:針對 Publish/Subscribe 機制,NestJS 都是採用相同的概念來處理 Request-response,差異只在於實現的方法,像上一篇 Redis Pub/Sub 是用兩個通道來實現的,而 MQTT 則是兩個 Topic。

小結

MQTT 是一種輕量且可靠的傳輸協定,它採用 Publish/Subscribe 模式,包括 Publisher、Subscriber 和 Message Broker 三個角色,以 Topic 作為溝通的介面。Topic 支援階層結構,並且可以使用 +# 來簡化訂閱的方式。MQTT 還定義了三種 QoS 等級,以滿足不同環境下的需求。

NestJS 提供了 MQTT Transporter 讓我們可以輕鬆地在微服務架構中使用 MQTT 來進行通訊,並且與其他 Transporter 有相同的開發風格,縱使轉換成其他的 Transporter 也不會有太大的改動。針對不同訊息採用不同配置的情況,NestJS 設計了 Record Builder 這個角色把特定的配置方式封裝起來,盡可能保持一致的開發體驗,以 MQTT 來說,可以透過 MqttRecordBuilder 來調整 MQTT 相關的配置,如:QoS。


上一篇
[用NestJS闖蕩微服務!] DAY04 - Redis Transporter
下一篇
[用NestJS闖蕩微服務!] DAY06 - NATS Transporter
系列文
用 NestJS 闖蕩微服務!21
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言